// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

package filestore

import (
	"bytes"
	"context"
	"encoding/base32"
	"errors"
	"fmt"
	"io"
	"io/fs"
	"os"
	"path/filepath"
	"strings"
	"time"

	"github.com/zeebo/errs"
	"go.uber.org/zap"

	"storj.io/common/experiment"
	"storj.io/common/storj"
	"storj.io/storj/storagenode/blobstore"
)

const (
	blobPermission = 0600
	dirPermission  = 0700

	v0PieceFileSuffix      = ""
	v1PieceFileSuffix      = ".sj1"
	unknownPieceFileSuffix = "/..error_unknown_format../"
	verificationFileName   = "storage-dir-verification"
)

var pathEncoding = base32.NewEncoding("abcdefghijklmnopqrstuvwxyz234567").WithPadding(base32.NoPadding)

// Dir represents single folder for storing blobs.
type Dir struct {
	log  *zap.Logger
	path string
}

// OpenDir opens existing folder for storing blobs.
func OpenDir(log *zap.Logger, path string) (*Dir, error) {
	dir := &Dir{
		log:  log,
		path: path,
	}

	stat := func(path string) error {
		_, err := os.Stat(path)
		return err
	}

	return dir, errs.Combine(
		stat(dir.blobsdir()),
		stat(dir.tempdir()),
		stat(dir.trashdir()),
	)
}

// NewDir returns folder for storing blobs.
func NewDir(log *zap.Logger, path string) (*Dir, error) {
	dir := &Dir{
		log:  log,
		path: path,
	}

	return dir, errs.Combine(
		os.MkdirAll(dir.blobsdir(), dirPermission),
		os.MkdirAll(dir.tempdir(), dirPermission),
		os.MkdirAll(dir.trashdir(), dirPermission),
	)
}

// Path returns the directory path.
func (dir *Dir) Path() string { return dir.path }

// blobsdir is the sub-directory containing the blobs.
func (dir *Dir) blobsdir() string { return filepath.Join(dir.path, "blobs") }

// tempdir is used for temp files prior to being moved into blobsdir.
func (dir *Dir) tempdir() string { return filepath.Join(dir.path, "temp") }

// trashdir contains files staged for deletion for a period of time.
func (dir *Dir) trashdir() string { return filepath.Join(dir.path, "trash") }

// CreateVerificationFile creates a file to be used for storage directory verification.
func (dir *Dir) CreateVerificationFile(ctx context.Context, id storj.NodeID) error {
	f, err := os.Create(filepath.Join(dir.path, verificationFileName))
	if err != nil {
		return err
	}
	defer func() {
		err = errs.Combine(err, f.Close())
	}()
	_, err = f.Write(id.Bytes())
	return err
}

// Verify verifies that the storage directory is correct by checking for the existence and validity
// of the verification file.
func (dir *Dir) Verify(ctx context.Context, id storj.NodeID) error {
	content, err := os.ReadFile(filepath.Join(dir.path, verificationFileName))
	if err != nil {
		return err
	}

	if !bytes.Equal(content, id.Bytes()) {
		verifyID, err := storj.NodeIDFromBytes(content)
		if err != nil {
			return errs.New("content of file is not a valid node ID: %x", content)
		}
		return errs.New("node ID in file (%s) does not match running node's ID (%s)", verifyID, id.String())
	}
	return nil
}

// CreateTemporaryFile creates a preallocated temporary file in the temp directory
// prealloc preallocates file to make writing faster.
func (dir *Dir) CreateTemporaryFile(ctx context.Context, prealloc int64) (_ *os.File, err error) {
	const preallocLimit = 5 << 20 // 5 MB
	if prealloc > preallocLimit {
		prealloc = preallocLimit
	}

	file, err := os.CreateTemp(dir.tempdir(), "blob-*.partial")
	if err != nil {
		return nil, err
	}

	if prealloc >= 0 {
		if err := file.Truncate(prealloc); err != nil {
			return nil, errs.Combine(err, file.Close())
		}
	}
	return file, nil
}

// DeleteTemporary deletes a temporary file.
func (dir *Dir) DeleteTemporary(ctx context.Context, file *os.File) (err error) {
	defer mon.Task()(&ctx)(&err)
	closeErr := file.Close()
	return errs.Combine(closeErr, os.Remove(file.Name()))
}

// blobToBasePath converts a blob reference to a filepath in permanent storage. This may not be the
// entire path; blobPathForFormatVersion() must also be used. This is a separate call because this
// part of the filepath is constant, and blobPathForFormatVersion may need to be called multiple
// times with different storage.FormatVersion values.
func (dir *Dir) blobToBasePath(ref blobstore.BlobRef) (string, error) {
	return dir.refToDirPath(ref, dir.blobsdir())
}

// refToDirPath converts a blob reference to a filepath in the specified sub-directory.
func (dir *Dir) refToDirPath(ref blobstore.BlobRef, subDir string) (string, error) {
	if !ref.IsValid() {
		return "", blobstore.ErrInvalidBlobRef.New("")
	}

	namespace := pathEncoding.EncodeToString(ref.Namespace)
	key := pathEncoding.EncodeToString(ref.Key)
	if len(key) < 3 {
		// ensure we always have enough characters to split [:2] and [2:]
		key = "11" + key
	}
	return filepath.Join(subDir, namespace, key[:2], key[2:]), nil
}

// blobPathForFormatVersion adjusts a bare blob path (as might have been generated by a call to
// blobToBasePath()) to what it should be for the given storage format version.
func blobPathForFormatVersion(path string, formatVersion blobstore.FormatVersion) string {
	switch formatVersion {
	case FormatV0:
		return path + v0PieceFileSuffix
	case FormatV1:
		return path + v1PieceFileSuffix
	}
	return path + unknownPieceFileSuffix
}

// Commit commits the temporary file to permanent storage.
func (dir *Dir) Commit(ctx context.Context, file *os.File, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) (err error) {
	defer mon.Task()(&ctx)(&err)
	position, seekErr := file.Seek(0, io.SeekCurrent)
	truncErr := file.Truncate(position)

	var syncErr error
	if !experiment.Has(ctx, "nosync") {
		syncErr = file.Sync()
	}

	chmodErr := os.Chmod(file.Name(), blobPermission)
	closeErr := file.Close()

	if seekErr != nil || truncErr != nil || syncErr != nil || chmodErr != nil || closeErr != nil {
		removeErr := os.Remove(file.Name())
		return errs.Combine(seekErr, truncErr, syncErr, chmodErr, closeErr, removeErr)
	}

	path, err := dir.blobToBasePath(ref)
	if err != nil {
		removeErr := os.Remove(file.Name())
		return errs.Combine(err, removeErr)
	}
	path = blobPathForFormatVersion(path, formatVersion)

	mkdirErr := os.MkdirAll(filepath.Dir(path), dirPermission)
	if os.IsExist(mkdirErr) {
		mkdirErr = nil
	}

	if mkdirErr != nil {
		removeErr := os.Remove(file.Name())
		return errs.Combine(mkdirErr, removeErr)
	}

	renameErr := rename(file.Name(), path)
	if renameErr != nil {
		removeErr := os.Remove(file.Name())
		return errs.Combine(renameErr, removeErr)
	}

	return nil
}

// Open opens the file with the specified ref. It may need to check in more than one location in
// order to find the blob, if it was stored with an older version of the storage node software.
// In cases where the storage format version of a blob is already known, OpenWithStorageFormat()
// will generally be a better choice.
func (dir *Dir) Open(ctx context.Context, ref blobstore.BlobRef) (_ *os.File, _ blobstore.FormatVersion, err error) {
	defer mon.Task()(&ctx)(&err)
	path, err := dir.blobToBasePath(ref)
	if err != nil {
		return nil, FormatV0, err
	}
	for formatVer := MaxFormatVersionSupported; formatVer >= MinFormatVersionSupported; formatVer-- {
		vPath := blobPathForFormatVersion(path, formatVer)
		file, err := openFileReadOnly(vPath, blobPermission)
		if err == nil {
			return file, formatVer, nil
		}
		if !os.IsNotExist(err) {
			return nil, FormatV0, Error.New("unable to open %q: %v", vPath, err)
		}
	}
	return nil, FormatV0, os.ErrNotExist
}

// OpenWithStorageFormat opens an already-located blob file with a known storage format version,
// which avoids the potential need to search through multiple storage formats to find the blob.
func (dir *Dir) OpenWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ *os.File, err error) {
	defer mon.Task()(&ctx)(&err)
	path, err := dir.blobToBasePath(ref)
	if err != nil {
		return nil, err
	}
	vPath := blobPathForFormatVersion(path, formatVer)
	file, err := openFileReadOnly(vPath, blobPermission)
	if err == nil {
		return file, nil
	}
	if os.IsNotExist(err) {
		return nil, err
	}
	return nil, Error.New("unable to open %q: %v", vPath, err)
}

// Stat looks up disk metadata on the blob file. It may need to check in more than one location
// in order to find the blob, if it was stored with an older version of the storage node software.
// In cases where the storage format version of a blob is already known, StatWithStorageFormat()
// will generally be a better choice.
func (dir *Dir) Stat(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobInfo, err error) {
	defer mon.Task()(&ctx)(&err)
	path, err := dir.blobToBasePath(ref)
	if err != nil {
		return nil, err
	}
	for formatVer := MaxFormatVersionSupported; formatVer >= MinFormatVersionSupported; formatVer-- {
		vPath := blobPathForFormatVersion(path, formatVer)
		stat, err := os.Stat(vPath)
		if err == nil {
			return newBlobInfo(ref, vPath, stat, formatVer), nil
		}
		if !os.IsNotExist(err) {
			return nil, Error.New("unable to stat %q: %v", vPath, err)
		}
	}
	return nil, os.ErrNotExist
}

// StatWithStorageFormat looks up disk metadata on the blob file with the given storage format
// version. This avoids the need for checking for the file in multiple different storage format
// types.
func (dir *Dir) StatWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (_ blobstore.BlobInfo, err error) {
	defer mon.Task()(&ctx)(&err)
	path, err := dir.blobToBasePath(ref)
	if err != nil {
		return nil, err
	}
	vPath := blobPathForFormatVersion(path, formatVer)
	stat, err := os.Stat(vPath)
	if err == nil {
		return newBlobInfo(ref, vPath, stat, formatVer), nil
	}
	if os.IsNotExist(err) {
		return nil, err
	}
	return nil, Error.New("unable to stat %q: %v", vPath, err)
}

// Trash moves the blob specified by ref to the trash for every format version.
func (dir *Dir) Trash(ctx context.Context, ref blobstore.BlobRef, timestamp time.Time) (err error) {
	defer mon.Task()(&ctx)(&err)
	return dir.iterateStorageFormatVersions(ctx, ref, func(ctx context.Context, ref blobstore.BlobRef, formatVersion blobstore.FormatVersion) error {
		return dir.TrashWithStorageFormat(ctx, ref, formatVersion, timestamp)
	})
}

// TrashWithStorageFormat moves the blob specified by ref to the trash for the specified format version.
func (dir *Dir) TrashWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion, timestamp time.Time) (err error) {
	blobsBasePath, err := dir.blobToBasePath(ref)
	if err != nil {
		return err
	}

	blobsVerPath := blobPathForFormatVersion(blobsBasePath, formatVer)

	trashBasePath, err := dir.refToDirPath(ref, dir.trashdir())
	if err != nil {
		return err
	}

	trashVerPath := blobPathForFormatVersion(trashBasePath, formatVer)

	// ensure the dirs exist for trash path
	err = os.MkdirAll(filepath.Dir(trashVerPath), dirPermission)
	if err != nil && !os.IsExist(err) {
		return err
	}

	// Change mtime to the logical time of removal. This allows us to check the
	// mtime to know how long the file has been in the trash. If the file is
	// restored this may make it take longer to be trashed again, but the
	// simplicity is worth the trade-off.
	//
	// We change the mtime prior to moving the file so that if this call fails
	// the file will not be in the trash with an unmodified mtime, which could
	// result in its permanent deletion too soon.
	err = os.Chtimes(blobsVerPath, timestamp, timestamp)
	if os.IsNotExist(err) {
		return nil
	}
	if err != nil {
		return err
	}

	// move to trash
	err = rename(blobsVerPath, trashVerPath)
	if os.IsNotExist(err) {
		// no blob at that path; either it has a different storage format
		// version or there was a concurrent call. (This function is expected
		// by callers to return a nil error in the case of concurrent calls.)
		return nil
	}
	return err
}

// RestoreTrash moves every blob in the trash folder back into blobsdir.
func (dir *Dir) RestoreTrash(ctx context.Context, namespace []byte) (keysRestored [][]byte, err error) {
	var errorsEncountered errs.Group
	err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info blobstore.BlobInfo) error {
		blobsBasePath, err := dir.blobToBasePath(info.BlobRef())
		if err != nil {
			errorsEncountered.Add(err)
			return nil
		}

		blobsVerPath := blobPathForFormatVersion(blobsBasePath, info.StorageFormatVersion())

		trashBasePath, err := dir.refToDirPath(info.BlobRef(), dir.trashdir())
		if err != nil {
			errorsEncountered.Add(err)
			return nil
		}

		trashVerPath := blobPathForFormatVersion(trashBasePath, info.StorageFormatVersion())

		// ensure the dirs exist for blobs path
		err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission)
		if err != nil && !os.IsExist(err) {
			errorsEncountered.Add(err)
			return nil
		}

		// move back to blobsdir
		err = rename(trashVerPath, blobsVerPath)
		if os.IsNotExist(err) {
			// no blob at that path; either it has a different storage format
			// version or there was a concurrent call. (This function is expected
			// by callers to return a nil error in the case of concurrent calls.)
			return nil
		}
		if err != nil {
			errorsEncountered.Add(err)
			return nil
		}

		keysRestored = append(keysRestored, info.BlobRef().Key)
		return nil
	})
	errorsEncountered.Add(err)
	return keysRestored, errorsEncountered.Err()
}

// TryRestoreTrashBlob attempts to restore a blob from the trash if it exists.
// It returns nil if the blob was restored, or an error if the blob was not
// in the trash or could not be restored.
func (dir *Dir) TryRestoreTrashBlob(ctx context.Context, ref blobstore.BlobRef) (err error) {
	defer mon.Task()(&ctx)(&err)

	blobsBasePath, err := dir.blobToBasePath(ref)
	if err != nil {
		return err
	}

	trashBasePath, err := dir.refToDirPath(ref, dir.trashdir())
	if err != nil {
		return err
	}

	// ensure the dirs exist for blobs path
	blobsVerPath := blobPathForFormatVersion(blobsBasePath, MaxFormatVersionSupported)
	err = os.MkdirAll(filepath.Dir(blobsVerPath), dirPermission)
	if err != nil && !errors.Is(err, fs.ErrExist) {
		return err
	}

	trashVerPath := blobPathForFormatVersion(trashBasePath, MaxFormatVersionSupported)

	// move back to blobsdir
	return rename(trashVerPath, blobsVerPath)
}

// EmptyTrash walks the trash files for the given namespace and deletes any
// file whose mtime is older than trashedBefore. The mtime is modified when
// Trash is called.
func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (bytesEmptied int64, deletedKeys [][]byte, minPieceTimestamp time.Time, err error) {
	defer mon.Task()(&ctx)(&err)
	var errorsEncountered errs.Group
	minPieceTimestamp = time.Now()
	err = dir.walkNamespaceInPath(ctx, namespace, dir.trashdir(), func(info blobstore.BlobInfo) error {
		fileInfo, err := info.Stat(ctx)
		if err != nil {
			if os.IsNotExist(err) {
				return nil
			}
			if errors.Is(err, ErrIsDir) {
				return nil
			}
			if thisBlobInfo, ok := info.(*blobInfo); ok {
				errorsEncountered.Add(Error.New("%s: %w", thisBlobInfo.path, err))
			} else {
				// if this happens, it's time to extend the code to handle the other type
				errorsEncountered.Add(Error.New("%+v [unexpected type %T]: %w", info, info, err))
			}
			return nil
		}

		mtime := fileInfo.ModTime()
		if mtime.Before(trashedBefore) {
			err = dir.deleteWithStorageFormatInPath(ctx, dir.trashdir(), info.BlobRef(), info.StorageFormatVersion())
			if err != nil {
				errorsEncountered.Add(err)
				return nil
			}
			deletedKeys = append(deletedKeys, info.BlobRef().Key)
			bytesEmptied += fileInfo.Size()
		} else if mtime.Before(minPieceTimestamp) {
			minPieceTimestamp = mtime
		}
		return nil
	})
	errorsEncountered.Add(err)
	return bytesEmptied, deletedKeys, minPieceTimestamp, errorsEncountered.Err()
}

// DeleteTrashNamespace deletes the entire trash namespace.
func (dir *Dir) DeleteTrashNamespace(ctx context.Context, namespace []byte) (err error) {
	mon.Task()(&ctx)(&err)
	return dir.deleteNamespace(ctx, dir.trashdir(), namespace)
}

// iterateStorageFormatVersions executes f for all storage format versions,
// starting with the oldest format version. It is more likely, in the general
// case, that we will find the blob with the newest format version instead,
// but if we iterate backward here then we run the risk of a race condition:
// the blob might have existed with _SomeOldVer before the call, and could
// then have been updated atomically with _MaxVer concurrently while we were
// iterating. If we iterate _forwards_, this race should not occur because it
// is assumed that blobs are never rewritten with an _older_ storage format
// version.
//
// f will be executed for every storage format version regardless of the
// result, and will aggregate errors into a single returned error.
func (dir *Dir) iterateStorageFormatVersions(ctx context.Context, ref blobstore.BlobRef, f func(ctx context.Context, ref blobstore.BlobRef, i blobstore.FormatVersion) error) (err error) {
	defer mon.Task()(&ctx)(&err)
	var combinedErrors errs.Group
	for i := MinFormatVersionSupported; i <= MaxFormatVersionSupported; i++ {
		combinedErrors.Add(f(ctx, ref, i))
	}
	return combinedErrors.Err()
}

// Delete deletes blobs with the specified ref (in all supported storage formats).
//
// It doesn't return an error if the blob is not found for any reason.
func (dir *Dir) Delete(ctx context.Context, ref blobstore.BlobRef) (err error) {
	defer mon.Task()(&ctx)(&err)
	return dir.iterateStorageFormatVersions(ctx, ref, dir.DeleteWithStorageFormat)
}

// DeleteWithStorageFormat deletes the blob with the specified ref for one
// specific format version.
//
// It doesn't return an error if the blob isn't found for any reason.
func (dir *Dir) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) {
	defer mon.Task()(&ctx)(&err)
	return dir.deleteWithStorageFormatInPath(ctx, dir.blobsdir(), ref, formatVer)
}

// DeleteNamespace deletes blobs folder for a specific namespace.
func (dir *Dir) DeleteNamespace(ctx context.Context, ref []byte) (err error) {
	defer mon.Task()(&ctx)(&err)
	return dir.deleteNamespace(ctx, dir.blobsdir(), ref)
}

func (dir *Dir) deleteWithStorageFormatInPath(ctx context.Context, path string, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) (err error) {
	defer mon.Task()(&ctx)(&err)

	pathBase, err := dir.refToDirPath(ref, path)
	if err != nil {
		return err
	}

	verPath := blobPathForFormatVersion(pathBase, formatVer)

	// try removing the file
	err = os.Remove(verPath)

	// ignore concurrent deletes
	if os.IsNotExist(err) {
		// something is happening at the same time as this; possibly a
		// concurrent delete, or possibly a rewrite of the blob.
		return nil
	}

	return err
}

// deleteNamespace deletes folder with everything inside.
func (dir *Dir) deleteNamespace(ctx context.Context, path string, ref []byte) (err error) {
	defer mon.Task()(&ctx)(&err)

	namespace := pathEncoding.EncodeToString(ref)
	folderPath := filepath.Join(path, namespace)

	err = os.RemoveAll(folderPath)
	return err
}

const nameBatchSize = 1024

// ListNamespaces finds all known namespace IDs in use in local storage. They are not
// guaranteed to contain any blobs.
func (dir *Dir) ListNamespaces(ctx context.Context) (ids [][]byte, err error) {
	defer mon.Task()(&ctx)(&err)
	return dir.listNamespacesInPath(ctx, dir.blobsdir())
}

func (dir *Dir) listNamespacesInPath(ctx context.Context, path string) (ids [][]byte, err error) {
	defer mon.Task()(&ctx)(&err)
	openDir, err := os.Open(path)
	if err != nil {
		return nil, err
	}
	defer func() { err = errs.Combine(err, openDir.Close()) }()
	for {
		dirNames, err := openDir.Readdirnames(nameBatchSize)
		if err != nil {
			if errors.Is(err, io.EOF) || os.IsNotExist(err) {
				return ids, nil
			}
			return ids, err
		}
		if len(dirNames) == 0 {
			return ids, nil
		}
		for _, name := range dirNames {
			namespace, err := pathEncoding.DecodeString(name)
			if err != nil {
				// just an invalid directory entry, and not a namespace. probably
				// don't need to pass on this error
				continue
			}
			ids = append(ids, namespace)
		}
	}
}

// WalkNamespace executes walkFunc for each locally stored blob, stored with storage format V1 or
// greater, in the given namespace. If walkFunc returns a non-nil error, WalkNamespace will stop
// iterating and return the error immediately. The ctx parameter is intended specifically to allow
// canceling iteration early.
func (dir *Dir) WalkNamespace(ctx context.Context, namespace []byte, walkFunc func(blobstore.BlobInfo) error) (err error) {
	defer mon.Task()(&ctx)(&err)
	return dir.walkNamespaceInPath(ctx, namespace, dir.blobsdir(), walkFunc)
}

func (dir *Dir) walkNamespaceInPath(ctx context.Context, namespace []byte, path string, walkFunc func(blobstore.BlobInfo) error) (err error) {
	defer mon.Task()(&ctx)(&err)
	namespaceDir := pathEncoding.EncodeToString(namespace)
	nsDir := filepath.Join(path, namespaceDir)
	openDir, err := os.Open(nsDir)
	if err != nil {
		if os.IsNotExist(err) {
			dir.log.Debug("directory not found", zap.String("dir", nsDir))
			// job accomplished: there are no blobs in this namespace!
			return nil
		}
		return err
	}
	defer func() { err = errs.Combine(err, openDir.Close()) }()
	for {
		// check for context done both before and after our readdir() call
		if err := ctx.Err(); err != nil {
			return err
		}
		subdirNames, err := openDir.Readdirnames(nameBatchSize)
		if err != nil {
			if errors.Is(err, io.EOF) || os.IsNotExist(err) {
				return nil
			}
			return err
		}
		if len(subdirNames) == 0 {
			return nil
		}
		if err := ctx.Err(); err != nil {
			return err
		}
		for _, keyPrefix := range subdirNames {
			if len(keyPrefix) != 2 {
				// just an invalid subdir; could be garbage of many kinds. probably
				// don't need to pass on this error
				continue
			}
			err := walkNamespaceWithPrefix(ctx, dir.log, namespace, nsDir, keyPrefix, walkFunc)
			if err != nil {
				return err
			}
		}
	}
}

func decodeBlobInfo(namespace []byte, keyPrefix, keyDir, name string) (info blobstore.BlobInfo, ok bool) {
	blobFileName := name
	encodedKey := keyPrefix + blobFileName
	formatVer := FormatV0
	if strings.HasSuffix(blobFileName, v1PieceFileSuffix) {
		formatVer = FormatV1
		encodedKey = encodedKey[0 : len(encodedKey)-len(v1PieceFileSuffix)]
	}
	key, err := pathEncoding.DecodeString(encodedKey)
	if err != nil {
		return nil, false
	}
	ref := blobstore.BlobRef{
		Namespace: namespace,
		Key:       key,
	}
	return newBlobInfo(ref, filepath.Join(keyDir, blobFileName), nil, formatVer), true
}

func walkNamespaceWithPrefix(ctx context.Context, log *zap.Logger, namespace []byte, nsDir, keyPrefix string, walkFunc func(blobstore.BlobInfo) error) (err error) {
	keyDir := filepath.Join(nsDir, keyPrefix)
	openDir, err := os.Open(keyDir)
	if err != nil {
		return err
	}
	defer func() { err = errs.Combine(err, openDir.Close()) }()
	for {
		// check for context done both before and after our readdir() call
		if err := ctx.Err(); err != nil {
			return err
		}
		names, err := openDir.Readdirnames(nameBatchSize)
		if err != nil && !errors.Is(err, io.EOF) {
			return err
		}
		if os.IsNotExist(err) || len(names) == 0 {
			return nil
		}
		if err := ctx.Err(); err != nil {
			return err
		}
		for _, name := range names {
			blobInfo, ok := decodeBlobInfo(namespace, keyPrefix, keyDir, name)
			if !ok {
				continue
			}
			err = walkFunc(blobInfo)
			if err != nil {
				return err
			}
			// also check for context done between every walkFunc callback.
			if err := ctx.Err(); err != nil {
				return err
			}
		}
	}
}

// Info returns information about the current state of the dir.
func (dir *Dir) Info(ctx context.Context) (blobstore.DiskInfo, error) {
	path, err := filepath.Abs(dir.path)
	if err != nil {
		return blobstore.DiskInfo{}, err
	}
	return diskInfoFromPath(path)
}

type blobInfo struct {
	ref           blobstore.BlobRef
	path          string
	fileInfo      os.FileInfo
	formatVersion blobstore.FormatVersion
}

func newBlobInfo(ref blobstore.BlobRef, path string, fileInfo os.FileInfo, formatVer blobstore.FormatVersion) blobstore.BlobInfo {
	return &blobInfo{
		ref:           ref,
		path:          path,
		fileInfo:      fileInfo,
		formatVersion: formatVer,
	}
}

func (info *blobInfo) BlobRef() blobstore.BlobRef {
	return info.ref
}

func (info *blobInfo) StorageFormatVersion() blobstore.FormatVersion {
	return info.formatVersion
}

func (info *blobInfo) Stat(ctx context.Context) (os.FileInfo, error) {
	if info.fileInfo == nil {
		fileInfo, err := os.Lstat(info.path)
		if err != nil {
			if os.IsNotExist(err) {
				return nil, err
			}
			if isLowLevelCorruptionError(err) {
				return nil, &CorruptDataError{path: info.path, error: err}
			}
			return nil, err
		}
		if fileInfo.Mode().IsDir() {
			return fileInfo, ErrIsDir
		}
		info.fileInfo = fileInfo
	}
	return info.fileInfo, nil
}

func (info *blobInfo) FullPath(ctx context.Context) (string, error) {
	return info.path, nil
}

// CorruptDataError represents a filesystem or disk error which indicates data corruption.
//
// We use a custom error type here so that we can add explanatory information and wrap the original
// error at the same time.
type CorruptDataError struct {
	path  string
	error error
}

// Unwrap unwraps the error.
func (cde CorruptDataError) Unwrap() error {
	return cde.error
}

// Path returns the path at which the error was encountered.
func (cde CorruptDataError) Path() string {
	return cde.path
}

// Error returns an error string describing the condition.
func (cde CorruptDataError) Error() string {
	return fmt.Sprintf("unrecoverable error accessing data on the storage file system (path=%v; error=%v). This is most likely due to disk bad sectors or a corrupted file system. Check your disk for bad sectors and integrity", cde.path, cde.error)
}
